package com.developer.sinktest

import com.developer.apitest.SensorReading
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper}

object RedisSinkTest {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    //source
    val streamFromFile = env.readTextFile("D:\\idea\\projects\\other\\flink-demo\\src\\main\\resources\\sensor.txt")

    //transform
    val dataStream = streamFromFile.map( data => {
      val dataArray = data.split(",")
      SensorReading( dataArray(0).trim,dataArray(1).trim.toLong,dataArray(2).trim.toDouble)
    })

    val conf  = new FlinkJedisPoolConfig.Builder()
        .setHost("localhost")
        .setPort(6379)
        .build()



    //sink

    dataStream.addSink( new RedisSink(conf,new MyRedisMapper()))

    env.execute("redis sink test")
  }
}

class MyRedisMapper() extends RedisMapper[SensorReading]{

  //定义保存数据到 redis 的命令
  override def getCommandDescription: RedisCommandDescription = {
    //把传感器id和温度保存成哈希表 HSET
    new RedisCommandDescription( RedisCommand.HSET,"sensor_temperature")
  }

  //定义保存到redis的value
  override def getKeyFromData(t: SensorReading): String = t.temperature.toString

  //定义保存到redis的key
  override def getValueFromData(t: SensorReading): String = t.id
}
